/*******************************************************************************
 * Package: com.song.kkxxpoi
 * Type:    Mybaitis
 * Date:    2023-03-19 17:44
 *
 * Copyright (c) 2023 HUANENG GUICHENG TRUST CORP.,LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.kkxxpoi;

import com.google.common.collect.Lists;
import com.song.kkxxpoi.entity.IntEndScaleForecastEntity;
import com.song.kkxxpoi.mapper.IIntEndScaleForecastMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;

import javax.annotation.Resource;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 功能描述：
 *SELECT count(*) FROM int_end_scale_forecast;
 *
 * TRUNCATE TABLE int_end_scale_forecast;
 *
 *
 * INSERT INTO `person`.`int_end_scale_forecast`(`ID`, `SUB_PROJECT_CODE`, `SUB_PROJECT_NAME`, `START_DATE`, `MONEY`, `END_DATE`, `CREATE_BY`, `CREATE_TIME`, `UPDATE_BY`, `UPDATE_TIME`, `SCALE_CHANGE_TYPE`) VALUES ('1222', 'sxy', 'sxy', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
 * @author Songxianyang
 * @date 2023-03-19 17:44
 */
@Slf4j
@SpringBootTest
public class Mybatis {
    @Resource
    private IIntEndScaleForecastMapper iIntEndScaleForecastMapper;

    @Resource
    private SqlSessionFactory sqlSessionFactory;

    @Autowired
    private TransactionDefinition transactionDefinition;
    @Autowired
    private DataSourceTransactionManager dataSourceTransactionManager;

    private final int sizeFinal = 40000;
    //private final int sizeOne = 40001;


    @Test
    void list() {
        SqlSession sqlSession = sqlSessionFactory.openSession();
        IIntEndScaleForecastMapper mapper = sqlSession.getMapper(IIntEndScaleForecastMapper.class);
        List<IntEndScaleForecastEntity> intEndScaleForecastEntities = mapper.listAll();
        System.out.println(intEndScaleForecastEntities);
        //List<IntEndScaleForecastEntity> intEndScaleForecastEntities = iIntEndScaleForecastMapper.listAll();
        //System.out.println(intEndScaleForecastEntities);
    }

    // 非多线程处理
    @Test
    void batchSave() {
        List<IntEndScaleForecastEntity> list1 = iIntEndScaleForecastMapper.listAll();
        log.info("batchSaveTe: <<<<<size::::{}",list1.size());

        Instant time1 = Instant.now();
        List<IntEndScaleForecastEntity> list = getIntEndScaleForecastEntities();
        iIntEndScaleForecastMapper.batchSaveMainProject(list);
        Instant time2 = Instant.now();
        // 3575ms
        log.info(">>>查询data耗时:{}ms", Duration.between(time1, time2).toMillis());
        List<IntEndScaleForecastEntity> list2 = iIntEndScaleForecastMapper.listAll();
        log.info("batchSaveTe: <<<<<list2::::{}",list2.size());
    }

    /**
     * 模仿大数据接过来的数据
     * @return
     */
    private List<IntEndScaleForecastEntity> getIntEndScaleForecastEntities() {
        List<IntEndScaleForecastEntity> list = new ArrayList<>();
        for (int i = 0, size = sizeFinal; i < size; i++) {
            String name = i + "项目";
            IntEndScaleForecastEntity entity = new IntEndScaleForecastEntity();
            entity.setSubProjectName(name);
            entity.setSubProjectCode(i + "code");
            entity.setId(UUID.randomUUID().toString());
            list.add(entity);
        }
        return list;
    }

    /**
     * 对象转换
     */
    private List<IntEndScaleForecastEntity> beanConvert() {
        // 转换成当前对象
        List<IntEndScaleForecastEntity> assetList = new CopyOnWriteArrayList<>();
        // 获取大数据接过来的数据
        List<IntEndScaleForecastEntity> intEndScaleForecastEntities = getIntEndScaleForecastEntities();
        // 分批 4List
        List<List<IntEndScaleForecastEntity>> partition = Lists.partition(intEndScaleForecastEntities, 10000);

        List<CompletableFuture> cfs = new ArrayList<>();

        for (List<IntEndScaleForecastEntity> list : partition) {
            CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
                for (IntEndScaleForecastEntity endScaleForecastEntity : list) {
                    //...... 对象转换
                    endScaleForecastEntity.setCreateBy("宋先阳");
                    endScaleForecastEntity.setUpdateBy("宋先阳");
                }
                // 保证线程安全
                assetList.addAll(list);
            });
            cfs.add(voidCompletableFuture);
        }
        //获取多个任务
        CompletableFuture[] futureArray = cfs.toArray(new CompletableFuture[0]);
        //所有任务全部等待执行完
        CompletableFuture.allOf(futureArray).join();
        return assetList;
    }


    @Test
    void batchSaveThread() {
        List<IntEndScaleForecastEntity> list1 = iIntEndScaleForecastMapper.listAll();
        log.info("batchSaveTe: <<<<<size::::{}",list1.size());
        Instant time1 = Instant.now();
        // 大数据----> 对象转换耗时（map- 各种方法调用（耗时）->bean
        List<IntEndScaleForecastEntity> intEndScaleForecastEntities = beanConvert();

        List<List<IntEndScaleForecastEntity>> partition = Lists.partition(intEndScaleForecastEntities, 10000);

        // 控制主线程执行
        CountDownLatch count = new CountDownLatch(partition.size());
        // 子 多线程栅栏屏障
        CyclicBarrier barrier = new CyclicBarrier(partition.size());
        // 原子回滚对象
        AtomicBoolean rollback = new AtomicBoolean(false);
        for (List<IntEndScaleForecastEntity> list : partition) {
            // 无返回值
            CompletableFuture.runAsync(() ->
                    batchSaveTest(list, rollback, barrier, count)
            );
        }
        try {
            // 等待子线程 完成  主线程  开始执行
            count.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        Instant time2 = Instant.now();
        //>>>查询data耗时:1471ms
        log.info(">>>查询data耗时:{}ms",Duration.between(time1,time2).toMillis());
        List<IntEndScaleForecastEntity> list2 = iIntEndScaleForecastMapper.listAll();
        log.info("batchSaveTe: <<<<<list2::::{}",list2.size());

        SqlSession sqlSession = sqlSessionFactory.openSession();
        IIntEndScaleForecastMapper mapper = sqlSession.getMapper(IIntEndScaleForecastMapper.class);
        List<IntEndScaleForecastEntity> list3 = mapper.listAll();
        log.info("batchSaveTe: <<<<<list3::::{}",list3.size());
    }

    private void batchSaveTest(List<IntEndScaleForecastEntity> list, AtomicBoolean rollback, CyclicBarrier barrier, CountDownLatch count) {
// 编程式事务管理者
        TransactionStatus transaction = dataSourceTransactionManager.getTransaction(transactionDefinition);
        List<List<IntEndScaleForecastEntity>> partition = Lists.partition(list, 1000);
        try {
            for (List<IntEndScaleForecastEntity> entities : partition) {
                if (entities.size() != 1000) {
                    throw new RuntimeException("我出现异常是否全部回滚！");
                }
                iIntEndScaleForecastMapper.batchSaveMainProject(entities);
            }
        } catch (Exception e) {
            // 如果其中一个线程出现异常 则 所有数据全部混滚
            rollback.set(true);
        }finally {
            count.countDown();
        }
        log.info("线程{}，被挡住" ,Thread.currentThread().getName());
        try {
            // 线程屏障 等待
            barrier.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
        if (rollback.get()) {
            dataSourceTransactionManager.rollback(transaction);
            log.info("所有线程数据都回滚，线程名字》》》{}",Thread.currentThread().getName());
            // 结束当前循环 回滚所有数据
            return;
        }
        log.info("所有线程提交事务，线程名字》》》{}",Thread.currentThread().getName());
        dataSourceTransactionManager.commit(transaction);
    }
}
